package com.yanggu.bigdata.realtime.app.function

import com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import com.yanggu.bigdata.realtime.app.OutputTagObject.errorJsonDataOutputTag
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

class ParseJsonStrProcessFunction extends ProcessFunction[String, JSONObject] {

  override def processElement(value: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
    var jsonObject: JSONObject = null
    try {
      jsonObject = JSON.parseObject(value)
    } catch {
      case _: JSONException =>
        ctx.output(errorJsonDataOutputTag, value)
    }
    out.collect(jsonObject)
  }

}
